This code demonstrates how to use named sets of SO terms to annotate variants or search for variants. Possible uses include classififcation of variation by "impact" (e.g., à la SnpEff) and classification variation by region.
Important definitions: An SO "term" refers to a concept. The proper primary key for a SO term is a SO "id" (e.g., SO:0001147). Each term also has a "name" (natural_variant_site), "definition" ("Describes the natural sequence variants due to polymorphisms..."), and other fields. Names may change (and have changed) for a given SO id; therefore, developers should use SO ids internally.
This notebook generates SO maps that consists of a set of annotation labels (as map keys) mapped to a set of SO terms. Each map is named. For example, a SnpEff map might have labels "high", "moderate", and "low", with each label mapping to a set of SO terms.
The results here may be compared with the survey of terms from the same region in Exploring SO terms
In [1]:
import itertools
import pprint
import re
from IPython.display import HTML, display
import ga4gh.client
import prettytable
import requests
print(ga4gh.__version__)
In [2]:
gc = ga4gh.client.HttpClient("http://localhost:8000")
region_constraints = dict(referenceName="1", start=0, end=int(1e10))
variant_set_id = 'YnJjYTE6T1I0Rg'
variant_annotation_sets = list(gc.searchVariantAnnotationSets(variant_set_id))
variant_annotation_set = variant_annotation_sets[0]
print("Using first variant annotation set (of {n} total) for variant set {vs_id}\nvas_id={vas.id}".format(
n=len(variant_annotation_sets), vs_id=variant_set_id, vas=variant_annotation_set))
In [3]:
# poor-man's SO name-to-id map
# so_name_id_map will look like this:
# {u'natural_variant_site': u'SO:0001147',
# u'polypeptide_zinc_ion_contact_site': u'SO:0001103',
# u'methylated_adenine': u'SO:0000161',
# ...
id_name_re = re.compile("id: (?P<id>SO:\d+)\nname: (?P<name>\S+)")
url = "https://raw.githubusercontent.com/The-Sequence-Ontology/SO-Ontologies/master/so-xp-simple.obo"
so_name_id_map = {
m.group('name'): m.group('id')
for m in (id_name_re.search(s) for s in requests.get(url).text.split("\n\n")) if m is not None
}
In [4]:
def mk_effect_id_filter(so_ids=[]):
"""return list of OntologyTerm effect filters for the given list of so ids
>>> print(_mk_effect_id_filter("SO:123 SO:456".split()))
[{'id': 'SO:123'}, {'id': 'SO:456'}]
"""
return [{"id": id} for id in so_ids]
def remap_with_so_ids(so_name_map):
"""For a map of label => [so names], return a map of label => [so ids]"""
def _map1(n):
try:
return so_name_id_map[n]
except KeyError:
print("SO term name '{n}' is not (currently) valid".format(n=n))
return {label: filter(None, (_map1(n) for n in names)) for label, names in so_name_map.items()}
In [5]:
snpeff_so_name_map = {
"high": [
"chromosome_large_deletion",
"chromosome_large_inversion",
"chromosome_large_duplication",
"gene_rearrangement",
"gene_deleted",
"gene_fusion",
"gene_fusion_reverese",
"transcript_deleted",
"exon_deleted",
"exon_deleted_partial",
"exon_duplication",
"exon_duplication_partial",
"exon_inversion",
"exon_inversion_partial",
"frame_shift",
"stop_gained",
"stop_lost",
"start_lost",
"splice_site_acceptor",
"splice_site_donor",
"rare_amino_acid",
"protein_protein_interaction_locus",
"protein_structural_interaction_locus",
],
"moderate": [
"non_synonymous_coding",
"codon_insertion",
"codon_change_plus_codon_insertion",
"codon_deletion",
"codon_change_plus_codon_deletion",
"utr_5_deleted",
"utr_3_deleted",
"splice_site_branch_u12",
"splice_site_region",
"splice_site_branch",
"non_synonymous_stop",
"non_synonymous_start",
"synonymous_coding",
"synonymous_start",
"synonymous_stop",
"codon_change",
],
"low": [
"gene_inversion",
"gene_duplication",
"transcript_duplication",
"transcript_inversion",
"utr_5_prime",
"utr_3_prime",
"start_gained",
"upstream",
"downstream",
"motif",
"motif_deleted",
"regulation",
"micro_rna",
],
"modifiers": [
"custom",
"next_prot",
"intron_conserved",
"intron",
"intragenic",
"intergenic_conserved",
"intergenic",
"cds",
"exon",
"transcript",
"gene",
"sequence",
"chromosome_elongation",
"chromosome",
"genome",
"none",
]
}
In [6]:
snpeff_so_id_map = remap_with_so_ids(snpeff_so_name_map)
In [7]:
snpeff_so_id_map
Out[7]:
In [8]:
region_so_name_map = {
"locus": [
"gene_fusion",
"upstream_gene_variant",
],
"cds": [
"missense_variant",
"start_lost",
"stop_gained",
"stop_lost",
"synonymous_variant",
],
# note that utr, upstream, and downstream sets overlap intentionally
"utr": [
"3_prime_UTR_variant",
"5_prime_UTR_variant",
],
"upstream": [
"5_prime_UTR_variant",
"upstream_gene_variant",
],
"downstream": [
"3_prime_UTR_variant",
"downstream_gene_variant",
],
}
In [9]:
region_so_id_map = remap_with_so_ids(region_so_name_map)
In [10]:
so_maps = {"snpeff": snpeff_so_id_map, "region": region_so_id_map}
pprint.pprint(so_maps)
In [11]:
field_names = "n_vars name:label n_so_ids so_ids".split()
pt = prettytable.PrettyTable(field_names=field_names)
for name, so_map in so_maps.items():
for label, so_ids in so_map.items():
vs = []
# Searching with an empty filter means no filtering
# This should be changed: searching should be by inclusion, not lack of exclusion.
if len(so_ids)>0:
efilter = mk_effect_id_filter(so_ids)
vs = list(gc.searchVariantAnnotations(variant_annotation_set.id,
effects=efilter,
**region_constraints))
pt.add_row([
len(vs),
name + ":" + label,
len(so_ids),
" ".join(so_ids)
])
display(HTML(pt.get_html_string()))
In [12]:
# invert the SO map (name: {SO: label})
def invert_so_map(so_map):
"""for a so_map of {label: [so_id]}, return the inverse {so_id: [labels]}.
so_id:label is many:many
"""
lmap = sorted((so, label) for label, so_ids in so_map.items() for so in so_ids)
return {k: list(sl[1] for sl in sli) for k, sli in itertools.groupby(lmap, key=lambda e: e[0])}
def unique_labels_for_so_ids(so_labels_map, so_ids):
"""given a map of {so: [labels]} and a list of so_ids, return a list of unique labels"""
uniq_labels = set(itertools.chain.from_iterable(so_labels_map.get(so_id, []) for so_id in so_ids))
return list(uniq_labels)
def build_variant_record(v):
so_ids = list(set(eff.id for te in v.transcriptEffects for eff in te.effects))
impacts = unique_labels_for_so_ids(so_labels_maps["snpeff"], so_ids)
regions = unique_labels_for_so_ids(so_labels_maps["region"], so_ids)
return dict(
g = v.transcriptEffects[0].hgvsAnnotation.genomic,
t = v.transcriptEffects[0].hgvsAnnotation.transcript,
p = v.transcriptEffects[0].hgvsAnnotation.protein,
so_ids = " ".join(so_ids),
impacts = " ".join(impacts),
regions = " ".join(regions)
)
In [13]:
so_labels_maps = {name: invert_so_map(so_map) for name, so_map in so_maps.items()}
pprint.pprint(so_labels_maps)
In [14]:
variants = list(gc.searchVariantAnnotations(
variant_annotation_set.id,
effects = mk_effect_id_filter("SO:0001587 SO:0001819".split()),
**region_constraints))
field_names = "g t p so_ids impacts regions".split()
pt = prettytable.PrettyTable(field_names=field_names)
for v in variants:
vrec = build_variant_record(v)
pt.add_row([vrec[k] for k in field_names])
display(HTML(pt.get_html_string()))
In [ ]: